Add semantic search with Voyage AI embeddings#185
Conversation
Implement /search command for semantic search across all stored content. Uses Voyage AI voyage-3.5-lite for embeddings with in-memory numpy cosine similarity ranking. Embeddings are generated incrementally during pipeline cycles and cached in memory for fast search. New files: - services/search.py: SearchService, EmbeddingProvider protocol, VoyageEmbeddingProvider - discord/cogs/search.py: SearchCog with /search slash command Modified files: - models.py: ContentEmbedding model with ORM cascade from ContentItem - repository.py: Embedding CRUD and search query methods - pipeline.py: Optional embed_pending() call in run_cycle - content_posting.py: Wire SearchService creation when voyage_api_key set - bot.py: search_service attribute, register SearchCog - config.py: voyage_api_key, search_embedding_model, threshold, max_results - pyproject.toml: numpy, voyageai dependencies + mypy override
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
user1303836
left a comment
There was a problem hiding this comment.
Thorough review of the semantic search implementation. Overall this is solid work -- clean architecture, good test coverage, and proper integration with existing patterns.
However, there are a few blocking issues that need to be addressed before merge:
Blocking Issues:
-
Zero-norm vector bug in
cosine_similarity(search.py:65) - Division by zero when a zero-norm vector is passed. This can happen if an embedding API returns all zeros or if data gets corrupted. Add a guard. -
_check_model_consistency()called on every search (search.py:102) - This hits the DB on every single search query. Should be called once during cache load or on a timer, not on the hot path. -
No validation on
daysparameter (cogs/search.py:45) - Zero or negative values would produce nonsensical results. Add a minimum bound check. -
Unhandled
ValueErrorfromSourceType(source_type)(repository.py:945) - If an invalid source_type string is passed, this raises an unhandled ValueError. Needs a try/except or pre-validation. -
Missing zero-vector test (
test_search.py) - Add a test for the zero-vector edge case incosine_similarityto prevent regression.
Suggestions (non-blocking):
- Consider batching embedding API calls in
embed_pendinginstead of one call per item - Add a
setup()function at the bottom ofcogs/search.pyfor pattern consistency with other cogs - The hardcoded
dimension = 1024in VoyageEmbeddingProvider could be a lookup table keyed by model name
See inline comments for details.
src/intelstream/services/search.py
Outdated
| limit: int = 5, | ||
| threshold: float = 0.3, | ||
| ) -> list[SearchResult]: | ||
| await self._check_model_consistency() |
There was a problem hiding this comment.
[Blocking] _check_model_consistency() is called on every search() and every embed_pending() invocation, which means every single search query and every pipeline cycle hits the DB with get_latest_embedding(). This is unnecessary overhead for the hot path.
The design doc envisions this as a startup/initialization check. Consider calling it once on first use (similar to _ensure_cache_loaded) rather than on every call. You could add a _model_checked flag or combine the check into _ensure_cache_loaded.
For embed_pending() it makes some sense to check periodically (operator might change the model while the bot is running), but for search() the model won't change mid-request. At minimum, remove it from search() and rely on the embed_pending() check during the pipeline cycle.
src/intelstream/services/search.py
Outdated
|
|
||
|
|
||
| def cosine_similarity(query: np.ndarray, candidates: np.ndarray) -> np.ndarray: | ||
| query_norm = query / np.linalg.norm(query) |
There was a problem hiding this comment.
[Blocking - Bug] Zero-norm vector will cause a division-by-zero here, producing NaN values. While unlikely with real embedding models, a corrupted or empty embedding in the DB could trigger this.
If the query embedding API returns a zero vector (e.g., for an empty or nonsensical input), or if a stored candidate embedding is all zeros due to a previous failure, you'll get RuntimeWarning: invalid value encountered in divide and NaN scores that will silently pass the threshold check.
Add a guard:
query_norm_val = np.linalg.norm(query)
if query_norm_val == 0:
return np.zeros(candidates.shape[0])
query_norm = query / query_norm_val
cand_norms = np.linalg.norm(candidates, axis=1, keepdims=True)
cand_norms = np.maximum(cand_norms, 1e-10) # avoid div by zero
candidates_norm = candidates / cand_norms|
|
||
| @property | ||
| def dimension(self) -> int: | ||
| return 1024 |
There was a problem hiding this comment.
[Blocking - Bug] The dimension property returns 1024 as a hardcoded value, but the actual dimension returned by Voyage AI depends on the model and any output dimension configuration. The voyage-3.5-lite model defaults to 1024, but if someone configures a different model via search_embedding_model (e.g., voyage-3.5 which also defaults to 1024 but could be configured differently), this hardcoded value would be wrong.
More importantly, this property is part of the EmbeddingProvider protocol and could mislead callers into thinking it's the actual dimension. Since it's not currently used anywhere, this is low-risk right now, but it should either:
- Be removed from the protocol if unused, or
- Be made accurate (query the actual model dimension)
For now, since the property isn't used, I'd accept leaving it but adding a note that it's the default and may not reflect configured output dimensions.
| def __init__(self, bot: IntelStreamBot) -> None: | ||
| self.bot = bot | ||
|
|
||
| async def cog_load(self) -> None: |
There was a problem hiding this comment.
[Suggestion] The cog_load() method logs when Voyage is not configured but doesn't do anything functional. Since the command-time check at line 48 handles the unconfigured case, this cog_load() override is fine but purely cosmetic. No change needed -- just noting for clarity.
However, there's a missing setup() function at the bottom of the file. Every other cog in the codebase has:
async def setup(bot: "IntelStreamBot") -> None:
await bot.add_cog(SearchCog(bot))While it's not strictly required since bot.py imports and registers the cog directly, the pattern is used consistently across all other cog files. Worth adding for consistency.
| self, | ||
| interaction: discord.Interaction, | ||
| query: str, | ||
| days: int | None = None, |
There was a problem hiding this comment.
[Blocking] The days parameter is not validated. A user could pass days=0 which would create a since timestamp equal to datetime.now(UTC), returning essentially nothing (or only items published this exact second). Worse, a negative value like days=-1 would set since to a future date, also returning nothing but for confusing reasons.
Add validation:
if days is not None and days < 1:
await interaction.response.send_message(
"Days must be at least 1.", ephemeral=True
)
returnAlso consider an upper bound (e.g., le=3650 or similar) to prevent absurd values.
| async with self.session() as session: | ||
| query = ( | ||
| select(ContentEmbedding, ContentItem, Source) | ||
| .join(ContentItem, ContentEmbedding.content_item_id == ContentItem.id) |
There was a problem hiding this comment.
[Blocking] SourceType(source_type) here will raise ValueError if the user passes an invalid source type string that doesn't map to a SourceType enum value. This would cause an unhandled exception that bubbles up through the search path.
The search cog uses @app_commands.choices which constrains the Discord UI, but a malformed API request could still pass an invalid string. The exception would be caught by the cog's broad except Exception handler, which would show a generic error message -- acceptable but not ideal for UX.
Consider wrapping in a try/except in either the repository method or the _get_candidates caller:
try:
query = query.where(Source.type == SourceType(source_type))
except ValueError:
return [] # invalid source_type = no results| max_input_length=self.bot.settings.summary_max_input_length, | ||
| ) | ||
|
|
||
| search_service = None |
There was a problem hiding this comment.
[Good] Clean wiring. The conditional import and creation of SearchService only when voyage_api_key is configured matches the existing pattern for optional features. Setting self.bot.search_service here before passing to the pipeline is correct and ensures the cog registered later (SearchCog) can access it.
One minor note: the search_service local variable is set to None or the service, then assigned to both self.bot.search_service and passed to ContentPipeline. If the pipeline is later closed and recreated (unlikely but possible), the bot.search_service reference would still point to the original instance. This is fine for the current architecture.
| continue | ||
| return count | ||
|
|
||
| async def _ensure_cache_loaded(self) -> None: |
There was a problem hiding this comment.
[Blocking] _ensure_cache_loaded() loads ALL embeddings from the database into memory on first search. This is correct per the design, but there's a subtle race condition: if embed_pending() is running concurrently (from the pipeline cycle) while a user triggers a search, the cache could be loaded in an incomplete state.
Since Python's asyncio is single-threaded, this isn't a real concurrency issue -- the await points won't interleave within a single coroutine's logic. So this is actually fine. Disregard.
However, there IS a real issue: after _check_model_consistency() clears the cache and sets _cache_loaded = False (line 154), the subsequent _get_candidates() call will reload the cache. But if model consistency was just checked and all embeddings were cleared, the reload will load an empty set. Then embed_pending() will re-embed items, adding them to the cache. On the NEXT search, _cache_loaded is already True, so the cache won't reload -- but the cache was populated incrementally by embed_pending(), so this actually works correctly.
OK, I've convinced myself the logic is sound. No change needed.
| candidates = np.array([[-1.0, 0.0, 0.0]], dtype=np.float32) | ||
| scores = cosine_similarity(v, candidates) | ||
| assert scores[0] < -0.99 | ||
|
|
There was a problem hiding this comment.
[Blocking - Missing test] There's no test for cosine_similarity with a zero-norm vector. Given the bug I flagged in the production code, add a test that verifies the behavior when a zero vector is passed:
def test_zero_vector_does_not_crash(self):
v = np.array([0.0, 0.0, 0.0], dtype=np.float32)
candidates = np.array([[1.0, 0.0, 0.0]], dtype=np.float32)
# Should not raise, should return zeros or handle gracefully
scores = cosine_similarity(v, candidates)
assert not np.any(np.isnan(scores))This test will currently FAIL (proving the bug), and should pass after the fix.
| @@ -0,0 +1,350 @@ | |||
| import json | |||
There was a problem hiding this comment.
[Suggestion - Missing test coverage] The design doc specifies testing for:
test_backfill_processes_items_incrementally- partially covered bytest_embed_pending_processes_items_without_embeddingsbut doesn't verify that items are processed increated_at descorder or that the 100-item limit is respectedtest_generate_embedding_stores_in_repository- covered, good
Missing test from the design doc spec:
test_embedding_text_combines_title_summary_and_metadata- this IS covered inTestBuildEmbeddingText, good
Overall the test coverage is solid. The main gap is the zero-vector test mentioned above. Not blocking beyond that.
…h, input validation - Guard cosine_similarity against zero-norm query and candidate vectors - Move _check_model_consistency into _ensure_cache_loaded (once on first search) - Validate days parameter rejects zero/negative values in SearchCog - Handle invalid source_type in get_embeddings_with_items (return empty list) - Add tests for zero-vector cosine similarity and days validation
user1303836
left a comment
There was a problem hiding this comment.
Re-reviewed commit 779d47e. All 4 blocking issues and the test gap are properly addressed:
-
Zero-norm vectors: Clean fix using early return for zero query norm and
np.wherefor zero candidate norms. Both produce correct zero scores without division errors. -
Model consistency: Correctly moved into
_ensure_cache_loaded()so it runs once on first cache load. Note:embed_pending()no longer checks consistency, meaning a model change could cause a few wasted embed calls before the next search triggers cleanup. This is an acceptable tradeoff -- rare case, minor inefficiency, no correctness issue. -
Days validation:
days < 1check beforedefer(), clean error message. Tests cover 0 and -5. -
Invalid source_type: Minimal try/except returning empty list. Correct.
-
Tests: 4 new tests covering both zero-vector cases and both days validation cases.
All changes are surgical and well-scoped. LGTM -- this PR is ready to merge.
Summary
/searchslash command for semantic search across all stored content (Substack, YouTube, RSS, ArXiv, Blog, Twitter, Page)voyage-3.5-liteembeddings with in-memory numpy cosine similarity for rankingembed_pending(), with automatic backfill of existing contentContentEmbeddingdatabase model with ORM cascade deletion chain (Source -> ContentItem -> ContentEmbedding)EmbeddingProviderprotocol for swappable embedding backendsVOYAGE_API_KEYis configured; gracefully disabled otherwiseKey files
src/intelstream/services/search.pysrc/intelstream/discord/cogs/search.pysrc/intelstream/database/models.pysrc/intelstream/database/repository.pysrc/intelstream/services/pipeline.pysrc/intelstream/config.pyTest plan